import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import javax.annotation.Nullable;
import java.sql.Timestamp;

public class FlinkTableSQLStreamingExample
{

    public static void main(String[] args) throws Exception
    {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        //source,这里使用socket连接获取数据
        DataStreamSource<String> textStudent = env.socketTextStream("127.0.0.1", 9999, "\n");

        //处理输入数据流，转换为StudentInfo类型，方便后续处理
        SingleOutputStreamOperator<StudentInfo> dataStreamStudent = textStudent.flatMap(new FlatMapFunction<String, StudentInfo>() {
            @Override
            public void flatMap(String s, Collector<StudentInfo> collector) {
                String infos[] = s.split(",");
                if (StringUtils.isNotBlank(s) && infos.length == 5) {
                    StudentInfo studentInfo = new StudentInfo();
                    studentInfo.setName(infos[0]);
                    studentInfo.setSex(infos[1]);
                    studentInfo.setCourse(infos[2]);
                    studentInfo.setScore(Float.parseFloat(infos[3]));
                    studentInfo.setTimestamp(Long.parseLong(infos[4]));
                    studentInfo.setSysDate(new Timestamp(studentInfo.getTimestamp())); //
                    collector.collect(studentInfo);
                }
            }
        });

        DataStream<StudentInfo> dataStream = dataStreamStudent.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<StudentInfo>() {
            private final long maxTimeLag = 5000; // 5 seconds

            @Nullable
            @Override
            public Watermark getCurrentWatermark() {
                return new Watermark(System.currentTimeMillis() - maxTimeLag);
            }

            @Override
            public long extractTimestamp(StudentInfo studentInfo, long l) {
                return studentInfo.getTimestamp();
            }
        });

        //注册dataStreamStudent流到表中，表名为：studentInfo
        tEnv.registerDataStream("studentInfo", dataStream, "name,sex,course,score,timestamp,sysDate.rowtime");

        tEnv.registerFunction("utc2local", new UTC2Local());

        //GroupBy Window Aggregation 根据name分组，统计学科数量
        Table groupWindAggrTable = tEnv.sqlQuery("SELECT  " +
                "utc2local(TUMBLE_START(sysDate, INTERVAL '2' MINUTE)) as wStart, " +
                "name, SUM(score) as score " +
                "FROM  studentInfo " +
                "GROUP BY TUMBLE(sysDate, INTERVAL '2' MINUTE), name");
        DataStream<Row> groupWindAggrTableResult = tEnv.toAppendStream(groupWindAggrTable, Row.class);
        groupWindAggrTableResult.print();

        Table overWindowAggr = tEnv.sqlQuery(
                "select name,count(course) over(" +
                        "   partition by name order by sysDate" +
                        "   rows between 2 preceding and current row" +
                        ")" +
                        "from studentInfo");
        DataStream<Row> overWindowAggrResult = tEnv.toAppendStream(overWindowAggr, Row.class);
        overWindowAggrResult.print();
        Table overWindowAggr2 = tEnv.sqlQuery(
                "select name,COUNT(course) OVER w ,SUM(score) OVER w " +
                        "from studentInfo " +
                        "window w as ( " +
                        "   partition by name " +
                        "   order by sysDate " +
                        "   rows between 2 preceding and current row " +
                        ") ");
        DataStream<Tuple2<Boolean, Row>> overWindowAggr2Result = tEnv.toRetractStream(overWindowAggr2, Row.class);
        overWindowAggr2Result.print();

        env.execute("studentScoreAnalyse");
    }
}